package com.fwmagic.dynamic_rule.functions;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.QueryRouterServiceV3;
import com.fwmagic.dynamic_rule.utils.RuleSimulator;
import com.fwmagic.dynamic_rule.utils.SystemPrintUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 规则处理核心逻辑V3(Flink State && ClickHouse)
 */
public class RuleProcessFunctionV3 extends KeyedProcessFunction<String, LogBean, ResultBean> {

    private ListState<LogBean> listState;

    private QueryRouterServiceV3 queryRouterServiceV3;

    @Override
    public void open(Configuration parameters) throws Exception {
        queryRouterServiceV3 = new QueryRouterServiceV3();
        //准备一个底层明细事件的State
        ListStateDescriptor<LogBean> stateDescriptor = new ListStateDescriptor<>("events", LogBean.class);
        //设置State中只存储最近两个小时的数据
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(2)).updateTtlOnCreateAndWrite().build();
        stateDescriptor.enableTimeToLive(stateTtlConfig);
        listState = getRuntimeContext().getListState(stateDescriptor);
    }

    @Override
    public void processElement(LogBean logBean, Context context, Collector<ResultBean> collector) throws Exception {
        //将收到的事件放入历史明细state存储中
        listState.add(logBean);

        //获取规则参数
        RuleParam ruleParam = RuleSimulator.getRuleParam();

        //1.判断是否满足事件触发条件
        if (ruleParam.getTriggerParam().getEventId().equals(logBean.getEventId())) {
            //2.查询画像条件
            boolean b = queryRouterServiceV3.userProfileQuery(logBean, ruleParam);
            if (!b) return;

            SystemPrintUtils.printLog("规则计算被触发:" + logBean.getDeviceId() + ", " + logBean.getEventId());

            //3.查询用户行为次数条件
            SystemPrintUtils.printLog("查询用户行为次数条件");
            boolean b1 = queryRouterServiceV3.userActionCountQuery(logBean, listState, ruleParam);
            if (!b1) return;
            SystemPrintUtils.printLog("用户行为次数条件匹配！！！");

            //4.查询用户行为事件序列条件
            SystemPrintUtils.printLog("查询用户行为事件序列条件");
            boolean b2 = queryRouterServiceV3.userActionSequenceQuery(logBean, listState, ruleParam);
            if (!b2) return;

            SystemPrintUtils.printLog("用户行为序列条件匹配！！！");
            ResultBean resultBean = new ResultBean();
            resultBean.setTimeStamp(logBean.getTimeStamp());
            resultBean.setDeviceId(logBean.getDeviceId());
            resultBean.setRuleId(ruleParam.getRuleId());
            //收集如何规则匹配成功的结果
            collector.collect(resultBean);
        }
    }
}
